# Параллелизм с Cats Effect

[Перевод статьи "Concurrency with Cats Effect", написанной Krzysztof Atłasik][Article]

Параллелизм — это сложно. Brian Goetz в книге "Java Concurrency in Practice" пишет:

> Писать правильные программы трудно; писать правильные параллельные программы сложнее. 
> Просто в параллельной программе может произойти больше ошибок, чем в последовательной.

К нашему преимуществу, в Scala есть много замечательных инструментов, 
помогающих "укротить" параллельное программирование, 
таких как [Akka](https://akka.io), [ZIO](https://zio.dev), [Finagle](https://twitter.github.io/finagle/guide/index.html) 
или [Cats Effect](https://typelevel.org/cats-effect) (далее CE). 
В этой статье сосредоточимся на функциях, предлагаемых последней библиотекой.

## Что такое fiber?

Основным строительным блоком параллелизма CE является _"волокно"_ (fiber). 
В двух словах, волокно — это легковесный логический поток, который представляет собой последовательность действий. 
В случае CE это список операций, приостановленных с помощью монады IO и упорядоченных с помощью flatMap.

Волокна занимают меньше места по сравнению с системными потоками. 
Самое главное, они довольно дешевы с точки зрения использования памяти. 
Следовательно, они не такие дефицитные ресурсы, как потоки. 
Нам не нужно создавать пулы волокон, мы можем просто создать новый, когда нам это нужно.

В отличие от системных потоков, волокна не полагаются на планировщик операционной системы для переключения между контекстами. 
Вместо этого они используют планировщик уровня инфраструктуры, который контролирует, когда и как они выполняются. 
Одно волокно не привязано к конкретному потоку. 
Среда выполнения Cats Effect может многократно создавать десятки тысяч волокон в несколько системных потоков. 
В JVM среда выполнения обычно использует пул потоков фиксированного размера, привязанный к ЦП, 
но для однопоточных сред, таких как Scala.js, она может работать даже в одном потоке.

![fibers](https://softwaremill.com/user/pages/blog/48.concurrency-with-cats-effect/image1.png?g-225826a2)

## Управление волокнами

В CE можно разветвить вычисления с помощью метода `start`. 
Он возвращает экземпляр типа `Fiber`, который предоставляет доступ к методу `join`.
Вызов `join` заставляет волокно стороны вызова ждать завершения присоединенного волокна.

```dotty
import cats.effect.*
import scala.concurrent.duration.*

val task =
  for
    _ <- IO.println("Task started")
    _ <- IO.sleep(1.second)
    _ <- IO.println("Task completed")
  yield ()

for
  fiber <- task.start
  _ <- IO.println("Hello from main fiber")
  // ждем 1 секунду до завершения разветвленного волокна
  _ <- fiber.join
yield ()
```

[Рассмотренный пример в Scastie](https://scastie.scala-lang.org/W1qDRO72QlaDn40qQ0A5lw)

Другой метод, доступный на `Fiber`, - `cancel`. 
Он позволяет аннулировать волокно вместе с вычислениями, выполняемыми на нем. 
Метод `cancel` не вернет управление до тех пор, пока все ресурсы, выделенные в волокне, 
не будут должным образом завершены, а все базовые операции не будут отменены.

С помощью `onCancel(fin: IO[Unit])` можно настроить финализатор, который будет запускаться при отмене задачи.

```dotty
import cats.effect.*
import scala.concurrent.duration.*

val task =
  for
    _ <- IO.sleep(5.seconds)
    _ <- IO.println("Computations complete")
  yield ()

for
  fiber <- task.onCancel(IO.println("Task canceled!")).start
  _ <- IO.sleep(1.second)
  // волокно завершилось бы через 5 секунд, но оно отменено через 1 секунду
  _ <- fiber.cancel
yield ()
```

[Рассмотренный пример в Scastie](https://scastie.scala-lang.org/DpUgfL8bTdaX1zkR9JiLxg)

Метод `join` возвращает значение типа `Outcome`, имеющее следующие подтипы: `Succeeded`, `Errored` и `Canceled`. 
Можно выполнить сопоставление с шаблоном для результата, чтобы проверить завершенный статус волокна.

Значение, возвращаемое волокном, будет доступно в качестве поля результата `Succeeded`. 
Таким образом, можно передавать значения из разветвленной задачи на сторону вызова.

```dotty
import cats.effect.*
import scala.concurrent.duration.*

val task = IO.sleep(500.millis) *> IO.pure("Hello world!")

for
  fiber <- task.start
  _ <- fiber.join.flatMap {
    case Outcome.Succeeded(value) =>
      value.flatMap(v => IO.println(s"Computed value: $v"))
    case Outcome.Errored(e) => IO.println(s"Error: $e")
    case Outcome.Canceled() => IO.println("Task was canceled!")
  }
yield ()
```

[Рассмотренный пример в Scastie](https://scastie.scala-lang.org/LglCtHUsSZeOxipO6OxQ2Q)

Есть другой вариант метода `join`: `joinWith(onCancel: F[A])`. 
Он позволяет указывать резервное вычисление в случае отмены волокна 
и имеет два специализированных варианта: `joinWithNever` - сокращение для `joinWith(IO.never)`, 
и `joinWithUnit` - сокращение для `joinWith(IO.unit)`.

## Тайм-ауты

Можно использовать метод `timeout`, чтобы указать период, по истечении которого волокно будет отменено.

```dotty
task
  .onCancel(IO.println("Task has timed out!"))
  .timeout(500.millis)
  .start
```

Метод `timeoutTo` позволяет указать резервный вариант для отмененных вычислений.

```dotty
task
  .onCancel(IO.println("Task has timed out!"))
  .timeoutTo(500.millis, planB)
  .start
```

Подобно `cancel` методы тайм-аута всегда будут ждать, пока все основные ресурсы, выделенные в волокне, 
не будут завершены должным образом. Ожидание можно пропустить с помощью `timeoutAndForget`. 
После вызова он немедленно возвращает управление стороне вызова и асинхронно запускает финализаторы. 
Имейте в виду, что если финализаторы никогда не завершатся, волокно будет протекать.

## Структурированный параллелизм

Если волокно не присоединено или не отменено, оно будет проходить параллельно волокну места вызова. 
Волокно завершится только в том случае, если его вычисления будут завершены или вызовут ошибку. 
Если вычисления, выполняемые на волокне, никогда не заканчиваются, волокно никогда не завершится.

```dotty
val task =
  for
    time <- IO.realTimeInstant
    _ <- IO.println(f"Current date and time is $time")
    _ <- IO.sleep(1.second)
  yield ()

for
  _ <- task.foreverM.start // задача запущена как бесконечный цикл
  // и волокно никогда не завершится
  _ <- IO.sleep(5.seconds)
  _ <- IO.println("Bye")
yield ()
```

Это может привести к утечке ресурсов. 
Волокна довольно легковесны, но если создать достаточное их количество, 
это все равно может привести к истощению доступной памяти.

Такие методы, как `start` и `join` являются примитивами параллелизма CE. 
Это удобные инструменты, но их следует использовать с осторожностью. 
В противном случае можно легко добиться утечки памяти или вызвать тупиковые ситуации.

Чтобы предотвратить утечку, мы должны либо явно отменить избыточные волокна, 
либо использовать более безопасные методы, такие как `background`. 
Метод `background` связывает жизненный цикл волокна с типом данных `Resource`. 
Всякий раз, когда ресурс завершается, он также отменяет базовое волокно.

```dotty
val backgroundTask: Resource[IO, Unit] = task
  .onCancel(IO.println("Closing the background task"))
  .background

backgroundTask.surround {
  for
    _ <- IO.sleep(5.seconds)
    _ <- IO.println("Bye") // после завершения первоочередной задачи
                           // фоновое волокно будет отменено
  yield ()
}
```

Статус волокна соответствует синтаксической структуре кода. 
Такой способ управления жизненным циклом волокон называется _структурным параллелизмом_.

Еще один инструмент структурированного параллелизма, предлагаемый CE, — это `Supervisor`.
`Supervisor` — это ресурс, способный порождать волокна, чьи жизненные циклы будет связаны с его жизненным циклом. 
Всякий раз, когда супервизор завершается, он также обязательно завершает все свои дочерние волокна.

`Supervisor` поставляется с двумя режимами. 
Мы можем выбрать режим, передав флаг `await` при создании ресурса. 
Если `await` - `false`, то супервизор отменит все свои активные потоки, когда завершится. Это значение по умолчанию. 
Если `await` - `true`, завершение супервизора будет блокироваться до тех пор, пока не будут завершены все волокна.

```dotty
Supervisor[IO](await = false).use { supervisor =>
  for
    _ <- supervisor.supervise(
      IO.println("A").andWait(1.second).foreverM
    )
    _ <- supervisor.supervise(
      IO.println("B").andWait(2.second).foreverM
    )
    _ <- IO.sleep(5.seconds) // волокна будут печатать A и B
                             // в течении 5 секунд, а затем будут завершены
  yield ()
}
```

Добавив предложение импорта `import cats.effect.implicits.*`, 
мы также можем добавить в область видимости метод расширения `supervise` для `IO`. 
Это позволит вызывать `supervise` аналогично тому, как вызывается `start`.

```dotty
import cats.effect.implicits.*

Supervisor[IO](await = true).use { supervisor =>
  IO.println("Hello")
    .andWait(1.second)
    .foreverM
    .timeout(5.seconds)
    .supervise(supervisor)
}
```

## Операторы параллелизма

CE предоставляет множество удобных высокоуровневых абстракций для общих операций, связанных с параллелизмом.

Например, с `IO.race` можно взять два `IO`, запустить их параллельно и дождаться более быстрого. 
Более медленное вычисление будет отменено. Тип результата `race` - `IO[Either[A, B]]`.

```dotty
val raced: IO[Either[String, Int]] = IO.race(
  IO("Faster").delayBy(5.seconds),
  IO(999).delayBy(10.seconds)
)
```

[Рассмотренный пример в Scastie](https://scastie.scala-lang.org/CA02wdioQo6v2xKzaWNYcg)

Метод `IO.racePair` дает больше контроля над тем, как справляться с потерей волокна. 
Он возвращает как более быструю задачу в виде `Outcome`, так и экземпляр объекта `Fiber`, 
который можно использовать для отмены или присоединения более медленного волокна. 
Поначалу возвращаемый тип может показаться немного запутанным: 
`Either[(OutcomeIO[A], FiberIO[B]), (FiberIO[A, OutcomeIO[B])]`.
Если левая задача "побеждает", то мы получаем левое значение `Either`: 
кортеж, содержащий результат левого волокна и дескриптор волокна для правого. 
Если "выигрывает" правый, то получаем правое значение, на этот раз с результатом правого волокна и дескриптором левого.

```dotty
IO.racePair(IO(10), IO(5).delayBy(2.seconds)).flatMap {
 case Left((resL, fibR))  => 
   fibR.cancel *> IO(s"Left fiber outcome: $resL")
 case Right((fibL, resR)) => 
   fibL.cancel *> IO(s"Right fiber outcome: $resR")
}
```

[Рассмотренный пример в Scastie](https://scastie.scala-lang.org/3lqHHMwWTNOEUgHNrHMh1Q)

Если нам нужно запустить два вычисления параллельно и дождаться их завершения, у нас есть несколько вариантов. 
Метод `IO.both` возвращает кортеж, содержащий оба результата: `IO[(A, B)]`. 
Если какая-либо из операций отменена или завершилась с ошибкой, вторая также будет отменена.

```dotty
IO.both(IO(1).delayBy(1.second), IO("Hello"))
```

`IO.bothOutcome` не прерывает другое волокно, даже если первое прекращено. 
Он возвращает кортеж, содержащий результаты обоих волокон.

Библиотека Cats предоставляет множество операторов, позволяющих параллельно выполнять эффекты. 
Они реализованы как методы расширения, поэтому для их использования нужно сделать соответствующий импорт, 
который внесет в область видимости правильные имплициты.

```dotty
import cats.syntax.all.*
```

Метод `parTupled` очень похож на `IO.both`. 
Его можно вызвать для кортежа из двух `IO`, и оба будут выполняться параллельно. Тип результата `IO[(A, B)]`. 
Семантика этого метода такая же, как и у `tupled`, с той лишь разницей, что эффект `tupled` выполняется последовательно.

Если необходимо выполнить произвольное количество эффектов одновременно, 
то можно использовать параллельные аналоги `traverse` и `sequence`: `parTraverse` и `parSequence`.

```dotty
val r: IO[List[Int]] = List(
  IO(1).delayBy(1.second),
  IO(2).delayBy(2.seconds),
  IO(3).delayBy(3.seconds)
).parSequence
```

[Рассмотренный пример в Scastie](https://scastie.scala-lang.org/kOE1QhM5QGyESu3KYHNfqg)

Можно ограничить количество одновременно выполняемых задач с помощью вариантов `parTraverseN` и `parSequenceN`. 
Они принимают дополнительный `Int` параметр, определяющий максимальный параллелизм.

По умолчанию оба `parTraverse` и `parSequence` сохраняют порядок задач, 
что может привести к небольшому снижению производительности. 
С другой стороны, `parTraverseUnordered` и `parSequenceUnordered` не соблюдают порядок, 
что потенциально может ускорить их выполнение.

И последнее, но не менее важное: мы можем использовать два полезных символических оператора: `&>` и `<&`. 
Они также позволяют выполнять два эффекта параллельно. Неудача в одном из `IO` отменит и другой. 
Если всё вычисление отменено, оба действия будут отменены. Оператор `&>` возвращает значение справа, `<&` - слева.

```dotty
// Получим 1 приблизительно через 2 секунды
IO(1).delayBy(1.second) <& IO(2).delayBy(2.seconds)
```

Об этих операторах можно думать как об аналогах последовательных `*>`, `<*`, `<<` и `>>`.

---

**Ссылки:**

- [Оригинал статьи - Concurrency with Cats Effect][Article]

[Article]: https://softwaremill.com/concurrency-with-cats-effect
